{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "8fa68386",
   "metadata": {},
   "source": [
    "#  Deploying multiple MCP services with Ray Serve \n",
    "This tutorial deploys two MCP services—Brave Search and Fetch—using Ray Serve, leveraging features like autoscaling, fractional CPU allocation, and seamless multi-service routing. \n",
    "\n",
    "Combined with Anyscale, this setup allows you to run production-grade services with minimal overhead, auto-provision compute as needed, and deploy updates without downtime. Whether you're scaling up a single model or routing across many, this pattern provides a clean, extensible path to deployment.\n",
    "\n",
    "It’s also very easy to add more MCP services—just call build_mcp_deployment for each new service and bind it in the router.\n",
    "\n",
    "The following architecture diagram illustrates deploying multiple MCP Docker images with Ray Serve:\n",
    "\n",
    "<img\n",
    "  src=\"https://agent-and-mcp.s3.us-east-2.amazonaws.com/mcp/diagrams/multiple_mcp_docker_to_http_service.png\"\n",
    "  alt=\"Deploy Multiple MCP Docker Images with Ray Serve Architecture\"\n",
    "  style=\"width:45%; display: block; margin: 0 auto;\"\n",
    "/>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f60fee14",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "- Ray [Serve], already included in the base Docker image\n",
    "- Podman\n",
    "- A Brave API key set in your environment (`BRAVE_API_KEY`)\n",
    "- MCP Python library \n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "826cc042",
   "metadata": {},
   "source": [
    "### Dependencies\n",
    "\n",
    "**Build Docker image for Ray Serve deployment**\n",
    "\n",
    "In this tutorial you need to [build a Docker image for deployment on Anyscale](https://docs.anyscale.com/configuration/dependency-management/dependency-byod/) using the [Dockerfile included in this code repo](./Dockerfile). \n",
    "\n",
    "The reason is that when you run `apt-get install -y podman` (e.g. installing a system package) from the workspace terminal, it only lives in the Ray head node and is not propagated to your Ray worker nodes. \n",
    "\n",
    "After building the Docker image, navigate to the **Dependencies** tab in Workspaces and select the corresponding image you just created, and set the **BRAVE_API_KEY** environment variable.\n",
    "\n",
    "**Note**\n",
    " This Docker image is provided solely to deploy the MCP with Ray Serve. Ensure that your MCP docker images, like `docker.io/mcp/brave-search`, are already published to your own private registry or public registry. \n",
    "\n",
    "### Common issues\n",
    "\n",
    "1. **FileNotFoundError: [Errno 2] No such file or directory**\n",
    "- Usually indicates Podman isn't installed correctly. Verify the Podman installation.\n",
    "\n",
    "2. **KeyError: 'BRAVE_API_KEY'**\n",
    "- Ensure you have exported BRAVE_API_KEY in your environment or included it in your dependency configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4136a5ae",
   "metadata": {},
   "source": [
    "## 1. Create the deployment file\n",
    "Save the following code as `multi_mcp_ray_serve.py`: \n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "75728757",
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import logging\n",
    "import os\n",
    "from contextlib import AsyncExitStack\n",
    "from typing import Any, Dict, List, Optional\n",
    "\n",
    "from fastapi import FastAPI, HTTPException, Request\n",
    "from ray import serve\n",
    "from ray.serve.handle import DeploymentHandle\n",
    "\n",
    "from mcp import ClientSession, StdioServerParameters\n",
    "from mcp.client.stdio import stdio_client\n",
    "\n",
    "logger = logging.getLogger(\"multi_mcp_serve\")\n",
    "\n",
    "def _podman_args(\n",
    "    image: str,\n",
    "    *,\n",
    "    extra_args: Optional[List[str]] = None,\n",
    "    env: Optional[Dict[str, str]] = None,\n",
    ") -> List[str]:\n",
    "    args = [\"run\", \"-i\", \"--rm\"]\n",
    "    for key, value in (env or {}).items():\n",
    "        if key.upper() == \"PATH\":\n",
    "            continue\n",
    "        args += [\"-e\", f\"{key}={value}\"]\n",
    "    if extra_args:\n",
    "        args += extra_args\n",
    "    args.append(image)\n",
    "    return args\n",
    "\n",
    "class _BaseMCP:\n",
    "    _PODMAN_ARGS: List[str] = []\n",
    "    _ENV: Dict[str, str] = {}\n",
    "\n",
    "    def __init__(self):\n",
    "        self._ready = asyncio.create_task(self._startup())\n",
    "\n",
    "    async def _startup(self):\n",
    "        params = StdioServerParameters(\n",
    "            command=\"podman\",\n",
    "            args=self._PODMAN_ARGS,\n",
    "            env=self._ENV,\n",
    "        )\n",
    "        self._stack = AsyncExitStack()\n",
    "        stdin, stdout = await self._stack.enter_async_context(stdio_client(params))\n",
    "        self.session = await self._stack.enter_async_context(ClientSession(stdin, stdout))\n",
    "        await self.session.initialize()\n",
    "        logger.info(\"%s replica ready\", type(self).__name__)\n",
    "\n",
    "    async def _ensure_ready(self):\n",
    "        await self._ready\n",
    "\n",
    "    async def list_tools(self) -> List[Dict[str, Any]]:\n",
    "        await self._ensure_ready()\n",
    "        resp = await self.session.list_tools()\n",
    "        return [\n",
    "            {\"name\": t.name, \"description\": t.description, \"input_schema\": t.inputSchema}\n",
    "            for t in resp.tools\n",
    "        ]\n",
    "\n",
    "    async def call_tool(self, tool_name: str, tool_args: Dict[str, Any]) -> Any:\n",
    "        await self._ensure_ready()\n",
    "        return await self.session.call_tool(tool_name, tool_args)\n",
    "\n",
    "    async def __del__(self):\n",
    "        if hasattr(self, \"_stack\"):\n",
    "            await self._stack.aclose()\n",
    "\n",
    "def build_mcp_deployment(\n",
    "    *,\n",
    "    name: str,\n",
    "    docker_image: str,\n",
    "    num_replicas: int = 3,\n",
    "    num_cpus: float = 0.5,\n",
    "    autoscaling_config: Optional[Dict[str, Any]] = None,\n",
    "    server_command: Optional[str] = None,\n",
    "    extra_podman_args: Optional[List[str]] = None,\n",
    "    env: Optional[Dict[str, str]] = None,\n",
    ") -> serve.Deployment:\n",
    "    \"\"\"\n",
    "    - If autoscaling_config is provided, Ray Serve will autoscale between\n",
    "      autoscaling_config['min_replicas'] and ['max_replicas'].\n",
    "    - Otherwise it will launch `num_replicas` fixed replicas.\n",
    "    \"\"\"\n",
    "    deployment_env = env or {}\n",
    "    podman_args = _podman_args(docker_image, extra_args=extra_podman_args, env=deployment_env)\n",
    "    if server_command:\n",
    "        podman_args.append(server_command)\n",
    "\n",
    "    # Build kwargs for the decorator:\n",
    "    deploy_kwargs: Dict[str, Any] = {\n",
    "        \"name\": name,\n",
    "        \"ray_actor_options\": {\"num_cpus\": num_cpus},\n",
    "    }\n",
    "    if autoscaling_config:\n",
    "        deploy_kwargs[\"autoscaling_config\"] = autoscaling_config\n",
    "    else:\n",
    "        deploy_kwargs[\"num_replicas\"] = num_replicas\n",
    "\n",
    "    @serve.deployment(**deploy_kwargs)\n",
    "    class MCP(_BaseMCP):\n",
    "        _PODMAN_ARGS = podman_args\n",
    "        _ENV = deployment_env\n",
    "\n",
    "    return MCP\n",
    "\n",
    "# -------------------------\n",
    "# HTTP router code\n",
    "# -------------------------\n",
    "\n",
    "api = FastAPI()\n",
    "\n",
    "@serve.deployment\n",
    "@serve.ingress(api)\n",
    "class Router:\n",
    "    def __init__(self,\n",
    "                 brave_search: DeploymentHandle,\n",
    "                 fetch: DeploymentHandle) -> None:\n",
    "        self._mcps = {\"brave_search\": brave_search, \"fetch\": fetch}\n",
    "\n",
    "    @api.get(\"/{mcp_name}/tools\")\n",
    "    async def list_tools_http(self, mcp_name: str):\n",
    "        handle = self._mcps.get(mcp_name)\n",
    "        if not handle:\n",
    "            raise HTTPException(404, f\"MCP {mcp_name} not found\")\n",
    "        try:\n",
    "            return {\"tools\": await handle.list_tools.remote()}\n",
    "        except Exception as exc:\n",
    "            logger.exception(\"Listing tools failed\")\n",
    "            raise HTTPException(500, str(exc))\n",
    "\n",
    "    @api.post(\"/{mcp_name}/call\")\n",
    "    async def call_tool_http(self, mcp_name: str, request: Request):\n",
    "        handle = self._mcps.get(mcp_name)\n",
    "        if not handle:\n",
    "            raise HTTPException(404, f\"MCP {mcp_name} not found\")\n",
    "        body = await request.json()\n",
    "        tool_name = body.get(\"tool_name\")\n",
    "        tool_args = body.get(\"tool_args\")\n",
    "        if tool_name is None or tool_args is None:\n",
    "            raise HTTPException(400, \"Missing 'tool_name' or 'tool_args'\")\n",
    "        try:\n",
    "            result = await handle.call_tool.remote(tool_name, tool_args)\n",
    "            return {\"result\": result}\n",
    "        except Exception as exc:\n",
    "            logger.exception(\"Tool call failed\")\n",
    "            raise HTTPException(500, str(exc))\n",
    "\n",
    "# -------------------------\n",
    "# Binding deployments\n",
    "# -------------------------\n",
    "\n",
    "if \"BRAVE_API_KEY\" not in os.environ:\n",
    "    raise RuntimeError(\"BRAVE_API_KEY must be set before `serve run`.\")\n",
    "\n",
    "# Example: autoscaling BraveSearch between 1 and 5 replicas,\n",
    "# targeting ~10 concurrent requests per replica.\n",
    "BraveSearch = build_mcp_deployment(\n",
    "    name=\"brave_search\",\n",
    "    docker_image=\"docker.io/mcp/brave-search\",\n",
    "    env={\"BRAVE_API_KEY\": os.environ[\"BRAVE_API_KEY\"]},\n",
    "    num_cpus=0.2,\n",
    "    autoscaling_config={\n",
    "        \"min_replicas\": 1,\n",
    "        \"max_replicas\": 5,\n",
    "        \"target_ongoing_requests\": 10,\n",
    "    },\n",
    ")\n",
    "\n",
    "# Example: keep Fetch at a fixed 2 replicas.\n",
    "Fetch = build_mcp_deployment(\n",
    "    name=\"fetch\",\n",
    "    docker_image=\"docker.io/mcp/fetch\",\n",
    "    num_replicas=2,\n",
    "    num_cpus=0.2,\n",
    ")\n",
    "\n",
    "brave_search_handle = BraveSearch.bind()\n",
    "fetch_handle = Fetch.bind()\n",
    "app = Router.bind(brave_search_handle, fetch_handle)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "faa57dda",
   "metadata": {},
   "source": [
    "You can run the app programmatically to launch it in the workspace:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "451222c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "serve.run(app)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1937b183",
   "metadata": {},
   "source": [
    "Or you can run it using the command line shown in next section."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4fa8f107",
   "metadata": {},
   "source": [
    "**Note:**\n",
    "\n",
    "* On the Ray cluster, use **Podman** instead of Docker to run and manage containers. This approach aligns with the guidelines provided in the [Ray Serve multi-app container deployment documentation](https://docs.ray.io/en/latest/serve/advanced-guides/multi-app-container.html).\n",
    "\n",
    "* Additionally, for images such as `\"docker.io/mcp/brave-search\"`, explicitly include the **`\"docker.io/\"`** prefix to ensure Podman correctly identifies the image URI.\n",
    "\n",
    "* This tutorial passes only the `num_cpus` parameter to `ray_actor_options`. Feel free to modify the code to include additional supported parameters as outlined here:\n",
    "    - https://docs.ray.io/en/latest/serve/resource-allocation.html#\n",
    "\n",
    "* Auto-scaling parameters are provided in `autoscaling_config` as an example. For more details on configuring auto-scaling in Ray Serve deployments, see:\n",
    "    - https://docs.ray.io/en/latest/serve/configure-serve-deployment.html  \n",
    "    - https://docs.ray.io/en/latest/serve/autoscaling-guide.html\n",
    "    - https://docs.ray.io/en/latest/serve/advanced-guides/advanced-autoscaling.html#serve-advanced-autoscaling"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "39b65a29",
   "metadata": {},
   "source": [
    "## 2. Run the service with Ray Serve in the workspace\n",
    "\n",
    "You can run the following command in the terminal to deploy the service using Ray Serve:\n",
    "\n",
    "```\n",
    "serve run multi_mcp_ray_serve:app\n",
    "```\n",
    "\n",
    "\n",
    "This starts the service on `http://localhost:8000`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "35894574",
   "metadata": {},
   "source": [
    "## e. Test the service"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "466ef969",
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "from pprint import pprint\n",
    "\n",
    "# Configuration.\n",
    "BASE_URL = \"http://localhost:8000\"  # Local tooling API base URL\n",
    "\n",
    "def list_tools(service: str):\n",
    "    \"\"\"\n",
    "    Retrieve the list of available tools for a given service.\n",
    "    \"\"\"\n",
    "    url = f\"{BASE_URL}/{service}/tools\"\n",
    "    response = requests.get(url)\n",
    "    response.raise_for_status()\n",
    "    return response.json()[\"tools\"]\n",
    "\n",
    "\n",
    "def call_tool(service: str, tool_name: str, tool_args: dict):\n",
    "    \"\"\"\n",
    "    Invoke a specific tool on a given service with the provided arguments.\n",
    "    \"\"\"\n",
    "    url = f\"{BASE_URL}/{service}/call\"\n",
    "    payload = {\"tool_name\": tool_name, \"tool_args\": tool_args}\n",
    "    response = requests.post(url, json=payload)\n",
    "    response.raise_for_status()\n",
    "    return response.json()[\"result\"]\n",
    "\n",
    "# List Brave Search tools.\n",
    "print(\"=== Brave Search: Available Tools ===\")\n",
    "brave_tools = list_tools(\"brave_search\")\n",
    "pprint(brave_tools)\n",
    "\n",
    "# Run a query via Brave Search.\n",
    "search_tool = brave_tools[0][\"name\"]\n",
    "print(f\"\\nUsing tool '{search_tool}' to search for best tacos in Los Angeles...\")\n",
    "search_result = call_tool(\n",
    "    service=\"brave_search\",\n",
    "    tool_name=search_tool,\n",
    "    tool_args={\"query\": \"best tacos in Los Angeles\"}\n",
    ")\n",
    "print(\"Web Search Results:\")\n",
    "pprint(search_result)\n",
    "\n",
    "# List Fetch tools.\n",
    "print(\"\\n=== Fetch Service: Available Tools ===\")\n",
    "fetch_tools = list_tools(\"fetch\")\n",
    "pprint(fetch_tools)\n",
    "\n",
    "# Fetch a URL.\n",
    "fetch_tool = fetch_tools[0][\"name\"]\n",
    "print(f\"\\nUsing tool '{fetch_tool}' to fetch https://example.com...\")\n",
    "fetch_result = call_tool(\n",
    "    service=\"fetch\",\n",
    "    tool_name=fetch_tool,\n",
    "    tool_args={\"url\": \"https://example.com\"}\n",
    ")\n",
    "print(\"Fetch Results:\")\n",
    "pprint(fetch_result)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bebf93b2",
   "metadata": {},
   "source": [
    "## 6.  Production deployment with Anyscale service\n",
    "\n",
    "For production deployment, use Anyscale services to deploy the Ray Serve app to a dedicated cluster without modifying the code. Anyscale ensures scalability, fault tolerance, and load balancing, keeping the service resilient against node failures, high traffic, and rolling updates.\n",
    "\n",
    "Use the following command to deploy the service:\n",
    "\n",
    "\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9e6d1861",
   "metadata": {},
   "source": [
    "```bash\n",
    "anyscale service deploy multi_mcp_ray_serve:app --name=multi_mcp_tool_service\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8ceff16e",
   "metadata": {},
   "source": [
    "**Note:**\n",
    " \n",
    "This Anyscale Service pulls the associated dependencies, compute config, and service config from the workspace. To define these explicitly, you can deploy from a config.yaml file using the -f flag. See [ServiceConfig reference](https://docs.anyscale.com/reference/service-api/#serviceconfig) for details."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "22ee0406",
   "metadata": {},
   "source": [
    "## 5. Query the production service\n",
    "\n",
    "When you deploy, you expose the service to a publicly accessible IP address which you can send requests to.\n",
    "\n",
    "In the preceding cell’s output, copy your API_KEY and BASE_URL. As an example, the values look like the following:\n",
    "\n",
    "* BASE_URL = \"https://multi-mcp-tool-service-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com\"\n",
    "* TOKEN = \"z3RIKzZwHDF9sV60o7M48WsOY1Z50dsXDrWRbxHYtPQ\"\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "Fill in the following placeholder values for the BASE_URL and API_KEY in the following Python requests object:\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "225ca572",
   "metadata": {},
   "source": [
    "```python\n",
    "import requests\n",
    "from pprint import pprint\n",
    "\n",
    "# Configuration\n",
    "BASE_URL = \"https://multi-mcp-tool-service-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com\"  # Replace with your own URL\n",
    "TOKEN = \"z3RIKzZwHDF9sV60o7M48WsOY1Z50dsXDrWRbxHYtPQ\" # Replace with your own token\n",
    "\n",
    "\n",
    "HEADERS = {\n",
    "    \"Authorization\": f\"Bearer {TOKEN}\"\n",
    "}\n",
    "\n",
    "def list_tools(service: str):\n",
    "    \"\"\"\n",
    "    Retrieve the list of available tools for a given service.\n",
    "    \"\"\"\n",
    "    url = f\"{BASE_URL}/{service}/tools\"\n",
    "    response = requests.get(url, headers=HEADERS)\n",
    "    response.raise_for_status()\n",
    "    return response.json()[\"tools\"]\n",
    "\n",
    "def call_tool(service: str, tool_name: str, tool_args: dict):\n",
    "    \"\"\"\n",
    "    Invoke a specific tool on a given service with the provided arguments.\n",
    "    \"\"\"\n",
    "    url = f\"{BASE_URL}/{service}/call\"\n",
    "    payload = {\"tool_name\": tool_name, \"tool_args\": tool_args}\n",
    "    response = requests.post(url, json=payload, headers=HEADERS)\n",
    "    response.raise_for_status()\n",
    "    return response.json()[\"result\"]\n",
    "\n",
    "# List Brave Search tools.\n",
    "print(\"=== Brave Search: Available Tools ===\")\n",
    "brave_tools = list_tools(\"brave_search\")\n",
    "pprint(brave_tools)\n",
    "\n",
    "# Perform a search for \"best tacos in Los Angeles\".\n",
    "search_tool = brave_tools[0][\"name\"]\n",
    "print(f\"\\nUsing tool '{search_tool}' to search for best tacos in Los Angeles...\")\n",
    "search_result = call_tool(\n",
    "    service=\"brave_search\",\n",
    "    tool_name=search_tool,\n",
    "    tool_args={\"query\": \"best tacos in Los Angeles\"}\n",
    ")\n",
    "print(\"Web Search Results:\")\n",
    "pprint(search_result)\n",
    "\n",
    "# List Fetch tools.\n",
    "print(\"\\n=== Fetch Service: Available Tools ===\")\n",
    "fetch_tools = list_tools(\"fetch\")\n",
    "pprint(fetch_tools)\n",
    "\n",
    "# Fetch the content of example.com\n",
    "fetch_tool = fetch_tools[0][\"name\"]\n",
    "print(f\"\\nUsing tool '{fetch_tool}' to fetch https://example.com...\")\n",
    "fetch_result = call_tool(\n",
    "    service=\"fetch\",\n",
    "    tool_name=fetch_tool,\n",
    "    tool_args={\"url\": \"https://example.com\"}\n",
    ")\n",
    "print(\"Fetch Results:\")\n",
    "pprint(fetch_result)\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "base",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
